const config = require('./config.js')
const package = require('./package.json')
// 输出端口地址
console.log("webredis start on:%d", config.PORT)
// 输出redis 地址
console.log("redis localtion:%s", config.REDIS_URL)
var redis = require('redis');
var http = require('http');

var socketio = require('socket.io');

var server = http.createServer(function (req, res) {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  });
  // 显示项目名称及版本号
  res.end(package.name + ' ' + package.version + ' ' + package.repository.url);

}).listen(config.PORT);

// 每一个websocket代表一个浏览器连接，使用socket.id为key保存频道订阅数据和socket实例
// 保存所有连接的socket
// socket.id=>socket
const WEB_SOCKETS = new Map()
// websocket订阅的频道集合
// socket.id=>Set<String>
const SUBSCRIBES = new Map()

var clent_error;
// 公用redis client实例
var client = redis.createClient(config.REDIS_URL)
client.on('error', function (error) {
  clent_error = error;
  console.log(error);
});
var sub_error;
// 只用于消息订阅subscribe的redis客户端,需要独占链接,全局变量
var sub = redis.createClient(config.REDIS_URL)
  .on("subscribe", function (channel, count) {
    /** 命令输出频道订阅日志 */
    console.log(count + ' subscribe: ' + channel);
  })
  .on('message', function (channel, message) {
    /** 
     * 分发订阅频道的收到的消息到订阅频道的连接
     * 遍历每个socket订阅的频道，检查当前消息的频道是在该socket订阅的频道名集合中，
     * 如果是就向该socket推送消息
     */
    for (var [socketid, channels] of SUBSCRIBES) {
      // 如果socket订阅了消息频道则推送消息    
      if (channels.has(channel)) {
        WEB_SOCKETS.get(socketid).emit(
          'message',
          {
            channel: channel,
            data: message
          })
      }
    }
  })
sub.on('error', function (error) {
  sub_error = error;
  console.log(error);
});

var io = socketio(server)
io.on('connection', function (socket) {
  console.log('%s connect', socket.id)
  // 将socket对象保存到全局映射表中
  WEB_SOCKETS.set(socket.id, socket)
  // 初始化订阅频道表
  SUBSCRIBES.set(socket.id, new Set())
  /** 频道订阅  */
  socket.on('subscribe', function (channels) {
    var ack = {
      success: false,
      request: 'subscribe',
      reply: null,
      respone: 'OK'
    }
    try {
      if (typeof channels === 'string') {
        channels = [channels]
      }
      if (!(channels instanceof Array)) {
        ack.respone = 'channels is NOT ARRAY'
        socket.emit('ack', ack)
        return
      } else if (sub.ready === false || sub.stream.writable === false) {
        ack.respone = sub_error.message
        socket.emit('ack', ack)
      } else if (channels.length > 0) {

        sub.subscribe(channels, (err, res) => {
          if (err) {
            ack.respone = err.message
          } else {
            ack.success = true
            ack.respone = '订阅成功'
            ack.reply = res
            saveChannels(socket, channels);
            socket.emit('ack', ack)
          }
        })

      } else {
        ack.respone = '频道列表为空'
        socket.emit('ack', ack)
      }
    } catch (error) {
      ack.respone = err.message
      socket.emit('ack', ack)
    }
  })
  /** 取消订阅 */
  socket.on('unsubscribe', function (channels) {
    var ack = {
      success: false,
      request: 'unsubscribe',
      reply: null,
      respone: 'OK'
    }
    try {
      if (typeof channels === 'string') {
        channels = [channels]
      }
      if (!(channels instanceof Array)) {
        ack.respone = 'channels is NOT ARRAY'
        socket.emit('ack', ack)
      } else if (sub.ready === false || sub.stream.writable === false) {
        ack.respone = sub_error.message
        socket.emit('ack', ack)
      } else if (channels.length > 0) {

        client.unsubscribe(channels, (err, res) => {
          if (err) {
            ack.respone = error.message
          } else {
            ack.success = true
            ack.respone = '取消订阅成功'
            ack.reply = res
            removeChannels(socket, channels);
          }
          socket.emit('ack', ack)
        })

      } else {
        ack.respone = '频道列表为空'
        socket.emit('ack', ack)
      }
    } catch (error) {
      ack.respone = error.message
      socket.emit('ack', ack)
    }
  })

  /** 发布消息 */
  socket.on('publish', (channel, message) => {
    var ack = {
      success: false,
      request: 'publish',
      reply: null,
      respone: 'OK'
    }
    try {
      if(!channel || !message){
        ack.respone = 'NULL INPUT (channel or message)'
        socket.emit('ack', ack)
      }else if (sub.ready === false || sub.stream.writable === false) {
        ack.respone = sub_error.message
        socket.emit('ack', ack)
      } else {

        client.publish(channel, message, (err, res) => {
          if (err) {
            ack.respone = error.message
          } else {
            ack.success = true
            ack.respone = channel + '发布成功'
            ack.reply = res
          }
          socket.emit('ack', ack)
        });

      }
    } catch (error) {
      ack.respone = error.message
      socket.emit('ack', ack)
    }

  });

  /** 关闭websocket连接 */
  socket.on('disconnect', function (reason) {
    console.log('%s disconnect %s', socket.id, reason)
    WEB_SOCKETS.delete(socket.id)
    SUBSCRIBES.delete(socket.id)
  })
});


// 删除socket订阅的频道的记录
function removeChannels(socket, channels) {
  var set = SUBSCRIBES.get(socket.id);
  channels.forEach(element => {
    set.delete(element);
  });
}

// 记录每个socket订阅的频道
function saveChannels(socket, channels) {
  var set = SUBSCRIBES.get(socket.id);
  if(set){
	  channels.forEach(element => {
		set.add(element);
	  });
  }else{	
	console.log("ERROR:INVALID socket!!!")
  }
}

